Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor PubSub acks to independent requests #37

Merged
merged 2 commits into from
Dec 7, 2023

Conversation

rnarubin
Copy link
Collaborator

@rnarubin rnarubin commented Dec 7, 2023

Previously pubsub acknowledgements were sent over the client stream of the streaming pull gRPC. This provided limited feedback for the ack/nack/modify callers, who had no (good) way to know their request was actually submitted. Furthermore there's evidence that some bug existed around connection resets and long ack times, although a definitive cause was not identified.

This change uses explicit acknowledge and modify_ack_deadline rpc calls to submit acks instead of the client stream. These enable much clearer feedback for ack callers, as well as better backpressure regulation in general. This implementation was loosely inspired by the approach in the golang pubsub library1.

Previously pubsub acknowledgements were sent over the client stream of
the streaming pull gRPC. This provided limited feedback for the
ack/nack/modify callers, who had no (good) way to know their request was
actually submitted. Furthermore there's evidence that some bug existed
around connection resets and long ack times, although a definitive cause
was not identified.

This change uses explicit `acknowledge` and `modify_ack_deadline` rpc
calls to submit acks instead of the client stream. These enable much
clearer feedback for ack callers, as well as better backpressure
regulation in general. This implementation was loosely inspired by the
approach in the golang pubsub library[1].

[1]: https://github.com/googleapis/google-cloud-go/blob/94d040898cc9e85fdac76560765b01cfd019d0b4/pubsub/iterator.go#L422-L446
@rnarubin rnarubin requested a review from jneem December 7, 2023 03:25
@rnarubin rnarubin self-assigned this Dec 7, 2023
@rnarubin
Copy link
Collaborator Author

rnarubin commented Dec 7, 2023

cc @blogle @mattc1170

// now create the future to actually wait on the outcome
async move {
match send_result {
Ok(()) => match listener.await {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the oneshot::Receiver is used as a Future like here, does that mean it waits indefinitely? There's no need to timeout here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, it waits until there is a response or until the sender half is dropped. A user can add a timeout around ack() in their application code if they'd like

@mattc1170
Copy link

Once this commit is landed, we also need to update hedwig-rust to change the ya-gcp dependency to 0.11.1, right?

Copy link
Collaborator

@jneem jneem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

self.inner.clone(),
self.inner.clone(),
self.inner.clone(),
],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙃

.drain(..)
.map(|TokenFeedback { completion, .. }| completion);

// peel off the first to avoid cloning in the common single-message case
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to ask "isn't the response tiny anyway?", but tonic::Status is surprisingly big...

let (acks, acks_rx) = mpsc::unbounded_channel();
let (nacks, nacks_rx) = mpsc::unbounded_channel();
let (modacks, modacks_rx) = mpsc::unbounded_channel();
let ack_router = Arc::new(AckRouter { acks, nacks, modacks });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since AckRouter is just 3 senders, why not just clone it instead of putting it behind an Arc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah i went back and forth on this decision. The channels are basically Arc<InnerThing> themselves, so it's effectively a question of

  1. Clone 1 Arc, have some indirection on ack/nack/modack, ack tokens hold 1 (additional) pointer
  2. Clone 3 Arcs, less indirection in ack/nack/modack, ack tokens hold 3 pointers

I went with 1 Arc on the premise that users may go through hundreds of thousands of messages and tokens, so making the tokens a little smaller and having fewer atomic inc/dec calls is a better trade. But the difference is minuscule either way

@rnarubin
Copy link
Collaborator Author

rnarubin commented Dec 7, 2023

we also need to update hedwig-rust to change the ya-gcp dependency to 0.11.1, right?

Because it's a minor version change, you can just update ya-gcp in the end application's lock file cargo update -p ya-gcp --precise 0.11.1

@rnarubin rnarubin merged commit e257431 into standard-ai:master Dec 7, 2023
11 checks passed
@rnarubin rnarubin deleted the independent-acks branch December 7, 2023 18:08
rnarubin added a commit to rnarubin/ya-gcp that referenced this pull request Dec 8, 2023
The changes in standard-ai#37 did not attempt retries on the explicit ack requests.
These appear to fail in practice periodically (~1hr), perhaps due to
some connection resets from the server.

This change introduces retries to those requests
rnarubin added a commit that referenced this pull request Dec 11, 2023
The changes in #37 did not attempt retries on the explicit ack requests.
These appear to fail in practice periodically (~1hr), perhaps due to
some connection resets from the server.

This change introduces retries to those requests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants